#
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import socket

import msgpack
import oslo.messaging
from oslo_config import cfg
from oslo_utils import netutils
from oslo_utils import timeutils
from oslo_utils import units

from ceilometer import dispatcher
from ceilometer.event.storage import models
from ceilometer import messaging
from ceilometer.i18n import _, _LE
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import utils

OPTS = [
    cfg.StrOpt('udp_address',
               default='0.0.0.0',
               help='Address to which the UDP socket is bound. Set to '
               'an empty string to disable.'),
    cfg.IntOpt('udp_port',
               default=4952,
               help='Port to which the UDP socket is bound.'),
    cfg.BoolOpt('requeue_sample_on_dispatcher_error',
                default=False,
                help='Requeue the sample on the collector sample queue '
                'when the collector fails to dispatch it. This is only valid '
                'if the sample come from the notifier publisher.'),
    cfg.BoolOpt('requeue_event_on_dispatcher_error',
                default=False,
                help='Requeue the event on the collector event queue '
                'when the collector fails to dispatch it.'),
]

cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
                    group='publisher_rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
                    group='publisher_notifier')
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
                    group='publisher_notifier')
cfg.CONF.import_opt('store_events', 'ceilometer.notification',
                    group='notification')


LOG = log.getLogger(__name__)


class CollectorService(os_service.Service):
    """Listener for the collector service."""
    def start(self):
        """Bind the UDP socket and handle incoming data."""
        # ensure dispatcher is configured before starting other services
        self.dispatcher_manager = dispatcher.load_dispatcher_manager()
        self.rpc_server = None
        self.sample_listener = None
        self.event_listener = None
        super(CollectorService, self).start()

        if cfg.CONF.collector.udp_address:
            self.tg.add_thread(self.start_udp)

        transport = messaging.get_transport(optional=True)
        if transport:
            self.rpc_server = messaging.get_rpc_server(
                transport, cfg.CONF.publisher_rpc.metering_topic, self)

            sample_target = oslo.messaging.Target(
                topic=cfg.CONF.publisher_notifier.metering_topic)
            self.sample_listener = messaging.get_notification_listener(
                transport, [sample_target],
                [SampleEndpoint(self.dispatcher_manager)],
                allow_requeue=(cfg.CONF.collector.
                               requeue_sample_on_dispatcher_error))

            if cfg.CONF.notification.store_events:
                event_target = oslo.messaging.Target(
                    topic=cfg.CONF.publisher_notifier.event_topic)
                self.event_listener = messaging.get_notification_listener(
                    transport, [event_target],
                    [EventEndpoint(self.dispatcher_manager)],
                    allow_requeue=(cfg.CONF.collector.
                                   requeue_event_on_dispatcher_error))
                self.event_listener.start()

            self.rpc_server.start()
            self.sample_listener.start()

            if not cfg.CONF.collector.udp_address:
                # Add a dummy thread to have wait() working
                self.tg.add_timer(604800, lambda: None)

    def start_udp(self):
        address_family = socket.AF_INET
        if netutils.is_valid_ipv6(cfg.CONF.collector.udp_address):
            address_family = socket.AF_INET6
        udp = socket.socket(address_family, socket.SOCK_DGRAM)
        udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        udp.bind((cfg.CONF.collector.udp_address,
                  cfg.CONF.collector.udp_port))

        self.udp_run = True
        while self.udp_run:
            # NOTE(jd) Arbitrary limit of 64K because that ought to be
            # enough for anybody.
            data, source = udp.recvfrom(64 * units.Ki)
            try:
                sample = msgpack.loads(data, encoding='utf-8')
            except Exception:
                LOG.warn(_("UDP: Cannot decode data sent by %s"), source)
            else:
                try:
                    LOG.debug(_("UDP: Storing %s"), sample)
                    self.dispatcher_manager.map_method('record_metering_data',
                                                       sample)
                except Exception:
                    LOG.exception(_("UDP: Unable to store meter"))

    def stop(self):
        self.udp_run = False
        if self.rpc_server:
            self.rpc_server.stop()
        if self.sample_listener:
            utils.kill_listeners([self.sample_listener])
        if self.event_listener:
            utils.kill_listeners([self.event_listener])
        super(CollectorService, self).stop()

    def record_metering_data(self, context, data):
        """RPC endpoint for messages we send to ourselves.

        When the notification messages are re-published through the
        RPC publisher, this method receives them for processing.
        """
        self.dispatcher_manager.map_method('record_metering_data', data=data)


class CollectorEndpoint(object):
    def __init__(self, dispatcher_manager, requeue_on_error):
        self.dispatcher_manager = dispatcher_manager
        self.requeue_on_error = requeue_on_error

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
        """RPC endpoint for notification messages

        When another service sends a notification over the message
        bus, this method receives it.
        """
        try:
            self.dispatcher_manager.map_method(self.method, payload)
        except Exception:
            if self.requeue_on_error:
                LOG.exception(_LE("Dispatcher failed to handle the %s, "
                                  "requeue it."), self.ep_type)
                return oslo.messaging.NotificationResult.REQUEUE
            raise


class SampleEndpoint(CollectorEndpoint):
    method = 'record_metering_data'
    ep_type = 'sample'

    def __init__(self, dispatcher_manager):
        super(SampleEndpoint, self).__init__(
            dispatcher_manager,
            cfg.CONF.collector.requeue_sample_on_dispatcher_error)


class EventEndpoint(CollectorEndpoint):
    method = 'record_events'
    ep_type = 'event'

    def __init__(self, dispatcher_manager):
        super(EventEndpoint, self).__init__(
            dispatcher_manager,
            cfg.CONF.collector.requeue_event_on_dispatcher_error)

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
        events = []
        for ev in payload:
            try:
                events.append(
                    models.Event(
                        message_id=ev['message_id'],
                        event_type=ev['event_type'],
                        generated=timeutils.normalize_time(
                            timeutils.parse_isotime(ev['generated'])),
                        traits=[models.Trait(
                                name, dtype,
                                models.Trait.convert_value(dtype, value))
                                for name, dtype, value in ev['traits']],
                        raw=ev.get('raw', {}))
                )
            except Exception:
                LOG.exception(_LE("Error processing event and it will be "
                                  "dropped: %s"), ev)
        return super(EventEndpoint, self).sample(
            ctxt, publisher_id, event_type, events, metadata)
